Skip to content

SQL API Extensions: Expose planning APIs and make classes public#38951

Draft
damccorm wants to merge 11 commits into
apache:masterfrom
damccorm:feature/sql-api-extensions-only
Draft

SQL API Extensions: Expose planning APIs and make classes public#38951
damccorm wants to merge 11 commits into
apache:masterfrom
damccorm:feature/sql-api-extensions-only

Conversation

@damccorm

@damccorm damccorm commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Description

This PR is split from #38866. It focuses on exposing Beam SQL's planning and optimization infrastructure as an extensible API.

Previously, Beam SQL's planning stages (via Calcite) were mostly internal and tightly coupled to executing a full SQL string end-to-end. This PR refactors and exposes these planning stages to allow external orchestration of Beam SQL.

Key Changes

  • Exposed Planning APIs:
    • Added parseLogicalPlan(String query) / parseToRel(...) to BeamSqlEnv and QueryPlanner to allow parsing a SQL query string into a Calcite logical plan (RelNode) without immediately optimizing or executing it.
    • Added convertToBeamRel(RelNode logicalPlan) to allow taking an externally constructed or manipulated Calcite logical plan (RelNode) and converting it into a Beam physical plan (BeamRelNode / PCollection pipeline).
  • Extensibility Improvements:
    • Made BeamCalciteTable constructor public to allow external planners to instantiate it.
    • Made TextTableProvider.RowToCsv class public to allow external integration with text table serialization.
  • Testing:
    • Added a new comprehensive unit test testParseAndConvertHelpers in CalciteQueryPlannerTest.java that specifically exercises these new APIs end-to-end.

Why this is needed

This is a crucial feature for external query engines or orchestrators (such as Spark Connect or custom SQL platforms). They can now use Beam's SQL parser to get a logical plan, perform their own optimizations or integrations, and then hand it back to Beam to generate the final executable pipeline.

@gemini-code-assist

This comment was marked as outdated.

gemini-code-assist[bot]

This comment was marked as outdated.

@damccorm damccorm marked this pull request as draft June 12, 2026 19:47
…lose, preserve collation, and rename parameter
gemini-code-assist[bot]

This comment was marked as outdated.

gemini-code-assist[bot]

This comment was marked as outdated.

…l(RelNode, QueryParameters) and expose it in BeamSqlEnv
@damccorm damccorm force-pushed the feature/sql-api-extensions-only branch from 421b376 to fb42b31 Compare June 15, 2026 12:43
gemini-code-assist[bot]

This comment was marked as outdated.

@damccorm damccorm force-pushed the feature/sql-api-extensions-only branch from 399f90a to 097e41f Compare June 17, 2026 20:25
@damccorm

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request enhances the Beam SQL Calcite integration by exposing helper methods in BeamSqlEnv and QueryPlanner to parse SQL queries into logical RelNode trees and convert them to physical BeamRelNode trees, allowing logical plan rewriting. It also adds support for custom SQL conformance resolution (such as BABEL for Spark-SQL syntax) and makes RowToCsv public. The review feedback recommends removing the contradictory @VisibleForTesting annotation from the newly public RowToCsv class, adding defensive null checks to several new public methods, clarifying error messages for query parameters, and importing SqlOperatorTable to avoid using its fully qualified name in method signatures.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +240 to +241
@VisibleForTesting
static class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>>
public static class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since RowToCsv is now being made public specifically to allow external integration with text table serialization, keeping the @VisibleForTesting annotation is contradictory and misleading. It should be removed so that users and static analysis tools do not treat it as an internal-only API.

  public static class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>>

Comment on lines +152 to +155
public org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable
getOperatorTable() {
return planner.getOperatorTable();
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using the fully qualified name org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable in the method signature reduces readability. Since other Calcite classes are imported, we should use the simple name SqlOperatorTable instead.

  public SqlOperatorTable getOperatorTable() {
    return planner.getOperatorTable();
  }

…mRel catch blocks, optimize bindParameters, add null checks in parseToRel and convertToBeamRel, add null checks in registerSchemaFunction and use simple name for SqlOperatorTable, remove @VisibleForTesting from RowToCsv
@damccorm

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces helper methods in BeamSqlEnv, QueryPlanner, and CalciteQueryPlanner to allow parsing SQL queries into logical plans (RelNode) and converting those logical plans into Beam physical plans (BeamRelNode). It also resolves an issue where the Calcite conformance property was dropped by the Avatica JDBC path by reading it directly from pipeline options. Feedback on these changes highlights two main improvements: clearing the ThreadLocal RelMetadataQuery.THREAD_PROVIDERS in a finally block to prevent memory leaks, and optimizing the bindParameters method to avoid redundant node copies when child inputs do not change.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines 395 to 397
} finally {
planner.close();
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The RelMetadataQuery.THREAD_PROVIDERS is a ThreadLocal variable that is set at the beginning of convertToBeamRel but never cleared. In long-running environments or thread pools, failing to clear ThreadLocal variables can lead to classloader memory leaks because the metadata provider holds references to the RelOptCluster and other heavy objects. We should remove the thread-local value in the finally block.

Suggested change
} finally {
planner.close();
}
} finally {
RelMetadataQuery.THREAD_PROVIDERS.remove();
planner.close();
}

Comment on lines +282 to +295
private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
RelNode newRel = rel.accept(binder);
java.util.List<RelNode> inputs = newRel.getInputs();
java.util.List<RelNode> newInputs = new java.util.ArrayList<>(inputs.size());
boolean changed = newRel != rel;
for (RelNode input : inputs) {
RelNode newInput = bindParameters(input, binder);
newInputs.add(newInput);
if (newInput != input) {
changed = true;
}
}
return changed ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In bindParameters, calling newRel.copy(...) when only the row expressions of newRel changed (but none of its child inputs changed) is redundant because newRel already contains the updated expressions and the original inputs. We should only perform a copy if the child inputs actually changed (inputsChanged is true). This avoids unnecessary node copies during parameter binding.

Suggested change
private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
RelNode newRel = rel.accept(binder);
java.util.List<RelNode> inputs = newRel.getInputs();
java.util.List<RelNode> newInputs = new java.util.ArrayList<>(inputs.size());
boolean changed = newRel != rel;
for (RelNode input : inputs) {
RelNode newInput = bindParameters(input, binder);
newInputs.add(newInput);
if (newInput != input) {
changed = true;
}
}
return changed ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel;
}
private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
RelNode newRel = rel.accept(binder);
java.util.List<RelNode> inputs = newRel.getInputs();
java.util.List<RelNode> newInputs = new java.util.ArrayList<>(inputs.size());
boolean inputsChanged = false;
for (RelNode input : inputs) {
RelNode newInput = bindParameters(input, binder);
newInputs.add(newInput);
if (newInput != input) {
inputsChanged = true;
}
}
return inputsChanged ? newRel.copy(newRel.getTraitSet(), newInputs) : newRel;
}

… during translation

Remove early cleanup of THREAD_PROVIDERS in convertToBeamRel(RelNode, @nullable RelCollation) to prevent NullPointerException: metadataHandlerProvider when nodes query metadata during pipeline translation/assembly.

TAG=agy
CONV=0df243da-2867-4795-9889-6334ba7d1599
@damccorm damccorm force-pushed the feature/sql-api-extensions-only branch from 85ed76b to d9b8938 Compare June 17, 2026 22:28
damccorm added 2 commits June 18, 2026 14:53
… variable assignment

Fix compileJava issues for Java 26 environments by adding a fallback to JDK 21 in the gradlew script. Remove assignment of non-existent variable previousThreadProviders in CalciteQueryPlanner.java to fix branch compilation.

TAG=agy
CONV=0df243da-2867-4795-9889-6334ba7d1599
Set maxParallelForks = 1 for all Test tasks when running on Java 17+. On Java 17+, running tests in parallel can corrupt the binary test results output store (causing EOFException/Buffer underflow crashes during report generation). Running tests sequentially on newer JDKs fixes this.

TAG=agy
CONV=0df243da-2867-4795-9889-6334ba7d1599
@github-actions github-actions Bot added the build label Jun 18, 2026
Undo recent changes to gradlew and BeamModulePlugin.groovy, allowing builds to be configured through standard environment variables (e.g. JAVA_HOME) or gradle properties instead of modifying the repository's files directly.

TAG=agy
CONV=0df243da-2867-4795-9889-6334ba7d1599
@github-actions github-actions Bot removed the build label Jun 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant